/*
                    .::::.
                  .::::::::.
                 :::::::::::
             ..:::::::::::'	  FUCK YOU
           '::::::::::::'		Goddess bless, never BUG
             .::::::::::
        '::::::::::::::..
             ..::::::::::::.
           ``::::::::::::::::
            ::::``:::::::::'        .:::.
           ::::'   ':::::'       .::::::::.
         .::::'      ::::     .:::::::'::::.
        .:::'       :::::  .:::::::::' ':::::.
       .::'        :::::.:::::::::'      ':::::.
      .::'         ::::::::::::::'         ``::::.
  ...:::           ::::::::::::'              ``::.
 ```` ':.          ':::::::::'                  ::::..
                    '.:::::'                    ':'````..
                    
 ━━━━━━━━━━━━━━━━━━━━ 女神保佑,永无BUG ━━━━━━━━━━━━━━━━━━━━
*/
import com.alibaba.fastjson.{JSON, JSONObject}
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.KafkaCluster.Err
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaCluster, KafkaUtils}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scalikejdbc.{DB, SQL}
import scalikejdbc.config.DBs

/**
  * 将偏移量保存到MySQL中
  */
object SparkStreamingOffsetMysql {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("ssom").setMaster("local[2]")
    conf.set("spark.streaming.kafka.maxRatePerPartition", "100")  //设置每秒最大消费 100条记录
    val ssc = new StreamingContext(conf,Seconds(5))

    //先将省份信息读取转换为RDD
    val proInfo = ssc.sparkContext.textFile("dir/city/city.txt").map(line => (line.split(" ")(0),line.split(" ")(1)))

    //一系列基本的配置
    val groupid = "gp0123"
    val brokerList = "192.168.137.201:9092,192.168.137.203:9092,192.168.137.204:9092"
    val topic = "recharge"
    //可能会有多个Topic
    val topics = Set(topic)
    //设置kafka的配置
    val kafkas = Map(
      "metadata.broker.list"->brokerList,
      "group.id"-> groupid,
      "auto.offset.reset"->kafka.api.OffsetRequest.SmallestTimeString

    )
    //加载配置
    DBs.setup()
    //这一块我们就不需要在进行查询ZK中的offset的了，直接查询MySQL中的offset数据
    val fromdbOffset :Map[TopicAndPartition,Long] =
      DB.readOnly{
        implicit session =>
          //查询每个分组下面的所有消息
          SQL(s"select * from offsets where groupId ='${groupid}'")
            //查询出来后，将MySQL中的数据赋值给这个元组
            .map(m=>(TopicAndPartition(
            m.string("topic"),m.int("partitions")),m.long("untilOffset")))
            .toList().apply()
      }.toMap //最后要toMap一下，因为前面的返回值已经给定
    // 创建一个InputDStream，然后根据offset读取数据
    var kafkaStream :InputDStream[(String,String)] = null
    //从MySQL中获取数据，进行判断
    if(fromdbOffset.size ==0){
      //如果程序第一次启动
      kafkaStream = KafkaUtils.
        createDirectStream[String,String,StringDecoder,StringDecoder](
        ssc,kafkas,topics)
    }else{
      //如果程序不是第一次启动
      //首先获取Topic和partition、offset
      var checckOffset = Map[TopicAndPartition,Long]()
      // 加载kafka的配置
      val kafkaCluster = new KafkaCluster(kafkas)
      //首先获取Kafka中的所有Topic partition offset
      val earliesOffsets: Either[Err,
        Map[TopicAndPartition, KafkaCluster.LeaderOffset]] =
        kafkaCluster.getEarliestLeaderOffsets(fromdbOffset.keySet)
      //然后开始进行比较大小，用MySQL中的offset和kafka的offset进行比较
      if(earliesOffsets.isRight){
        //取到我们需要的Map
        val topicAndPartitionOffset:
          Map[TopicAndPartition, KafkaCluster.LeaderOffset] =
          earliesOffsets.right.get
        // 来个比较直接进行比较大小
        checckOffset = fromdbOffset.map(owner=>{
          //取我们kafka汇总的offset
          val topicOffset = topicAndPartitionOffset.get(owner._1).get.offset
          //进行比较  不允许重复消费 取最大的
          if(owner._2 > topicOffset){
            owner
          }else{
            (owner._1,topicOffset)
          }
        })
      }
      //不是第一次启动的话，按照之前的偏移量继续读取数据
      val messageHandler = (mmd:MessageAndMetadata[String,String])=>{
        (mmd.key(),mmd.message())
      }
      kafkaStream = KafkaUtils.
        createDirectStream[String,String,
        StringDecoder,StringDecoder,
        (String,String)](ssc,kafkas,checckOffset,messageHandler)
    }
    //开始处理数据流，跟咱们之前的ZK那一块一样了
    kafkaStream.foreachRDD(kafkaRDD=>{
      if(!kafkaRDD.isEmpty)
      //首先将获取的数据转换 获取offset  后面更新的时候使用
      val offsetRanges = kafkaRDD.asInstanceOf[HasOffsetRanges].offsetRanges
//      val lines = kafkaRDD.map(_._2)
//      lines.foreach(println)

      //需求一：充值通知处理
      val filtered = rechargeNotice(ssc.sparkContext,kafkaRDD)
      println("充值订单数：" + RedisUtils.getData("ChargeCount"))
      println("充值金额" + RedisUtils.getData("ChargefeeSum"))
      println("成功订单数：" + RedisUtils.getData("RstSuccess"))
      println()
      //对过滤后的数据进行缓存
      filtered.cache()
      //需求二：统计每小时各省的充值失败数据量
      provinceHourFailCount(filtered,proInfo)

      //需求三：以省份为维度统计订单量排名前 10 的省份数据,并且统计每个省份的订单成功率，只保留一位小数
      provinceCountTop10(ssc.sparkContext,proInfo,filtered)

      //需求四：实时统计每小时的充值笔数和充值金额
      hourChargefeeCount(filtered)



      println("------------------------------------------------------------")


      //更新偏移量
      DB.localTx{
        implicit session =>
          //取到所有的topic  partition offset
          for(os<-offsetRanges){
            //            // 通过SQL语句进行更新每次提交的偏移量数据
            //            SQL("UPDATE offsets SET groupId=?,topic=?,partitions=?,untilOffset=?")
            //              .bind(groupid,os.topic,os.partition,os.untilOffset).update().apply()
            SQL("replace into " +
              "offsets(groupId,topic,partitions,untilOffset) values(?,?,?,?)")
              .bind(groupid,os.topic,os.partition,os.untilOffset)
              .update().apply()
          }
      }
    }
    })
    ssc.start()
    ssc.awaitTermination()
  }



  //实时统计每小时的充值笔数和充值金额
  def hourChargefeeCount(filtered: RDD[(String, (String, Long, String, String, String))])={
    val aggr: RDD[(String, (Long, Long))] = filtered.map(tup => {
      val lines = tup._2
      val time = DateUtils.getDateHour(lines._4) //取出结束时间作为该订单时间：yyyyMMdd_HH
      val chargefee = if (lines._1 == "0000") lines._2 else 0L //若充值成功，则返回充值金额；充值失败返回 0
      (time, (1L, chargefee))
    }).reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))  //分别聚合统计充值笔数和充值金额

    aggr.foreachPartition(it => {
      it.foreach(line => {
        val time = line._1
        val upTime: String = if(RedisUtils.getData(time)!=null) RedisUtils.getData(time) else null
        //判断数据是否已经存在于Redis中，若存在则进行累加
        val count = if(upTime != null) upTime.split("_")(0).toLong+line._2._1 else line._2._1
        val chargefee = if(upTime != null) upTime.split("_")(1).toLong+line._2._2 else line._2._2
        RedisUtils.setData(time,count+"_"+chargefee)

        //更新数据
        DB.localTx{
          implicit session =>
              SQL("replace into " +
                "hour_chargefee_count(time,count,chargefee) values(?,?,?)")
                .bind(time,count,chargefee)
                .update().apply()
        }


      })
    })


  }

  //以省份为维度统计订单量排名前 10 的省份数据,并且统计每个省份的订单成功率，只保留一位小数
  def provinceCountTop10(sc:SparkContext,proInfo:RDD[(String,String)],filtered: RDD[(String, (String, Long, String, String, String))])={

    //从数据库中获取所有省份的数据
    val list: List[(String,(String,String))] = DB.readOnly { implicit session =>
      SQL("select provinceCode,count,rstSuccess from province_count_top10")
        .bind().map(rs => (rs.string("provinceCode"),(rs.string("count"),rs.string("rstSuccess")))).list().apply()}
    var flag = false //定义一个标记，判断是否是第一次运行程序
    //将查出的所有数据转换为Map集合,若不存在的将所有省份编号取出作为key，数据格式(省份编号,(成功数,总订单数))
    val dataRDD: RDD[(String, (String, String))] = if(list.size > 0 ) sc.parallelize(list) else {flag=true;proInfo.map(tup => (tup._1,("0","0")))}


    //先将数据中的省份编号和充值结果取出，作为key进行聚合统计
    val countRDD = filtered.map(tup =>{
      val row = tup._2
      val pro = row._5  //省份编号
      if(row._1 == "0000")  (pro,(1L,1L))  //(省份编号,(成功数,总订单数))
      else  (pro,(0L,1L))
    }).reduceByKey((x,y) => (x._1+y._1,x._2+y._2))

    //利用所有省份信息与该批次数据进行join操作，保证数据的可靠性
    val top10: Array[(String, (Long, Long))] = dataRDD.leftOuterJoin(countRDD).map(tup => {
      val data = tup._2._1 //从数据库查询出的结果
      val pici = tup._2._2.getOrElse((0L, 0L)) //该批次的聚合结果
      val rstSuccess = data._1.toLong + pici._1 //成功订单数
      val count = data._2.toLong + pici._2 //总订单数
      val successRate = NumberUtils.formatDouble(rstSuccess.toDouble / count.toLong, 3) //频繁操作时，数据会存在一丢丢问题

      if(pici._1 != 0L || pici._2 != 0L || flag) {  //只有当该批次有数据更新时才对数据库中的表进行更新
        //将记录插入数据库
        DB.autoCommit {
          implicit session =>
            SQL("replace into province_count_top10(provinceCode,rstSuccess,count,successRate) values(?,?,?,?)")
              .bind(tup._1, rstSuccess, count, successRate).update().apply()
        }
      }
      (tup._1, (rstSuccess, count)) //返回的数据格式(省份编号,(成功订单数,总订单数))
    }).sortBy(_._2._2,false).take(10)   //根据订单总数进行排序(降序)，取Top10

    for(top <- top10){
      println(s"省份编号：${top._1} ,成功订单数：${top._2._1} ,订单总数：${top._2._2} ,成功率：${NumberUtils.formatDouble(top._2._1/top._2._2.toDouble,3)}")
    }

//      .map(line => {
//      val provinceCode = line._1
//      //数据中查出的结果（订单数，成功订单数）
//      val tuple: (String, String) = map.getOrElse(provinceCode,("0","0"))
//      //该省份总订单数：该批次订单数+历史订单数
//      val count:Long = line._2._2 + tuple._1.toLong
//      //统计成功订单数
//      val rstSuccess: Long = line._2._1 + tuple._2.toLong
//      //计算成功率，并保留3位小数
//      val successRate = NumberUtils.formatDouble(rstSuccess.toDouble/count.toDouble,3)
//      (provinceCode+"_"+rstSuccess+"_"+successRate,count)
//    }).sortBy(_._2,false)   //这种排序方式存在问题，若该记录没在该批次中，则不会记录在前十


  }

  //统计每小时各省的充值失败数据量并插入数据库
  def provinceHourFailCount(filtered: RDD[(String, (String, Long, String, String, String))],proInfo:RDD[(String,String)])={
    //先过滤出充值失败的数据,将"日期_小时_省份"作为key进行聚合操作
    filtered.filter(_._2._1!="0000").map(tup => {
      val endTime = tup._2._4   //取出充值结束时间作为该交易时间
      val dateTime = DateUtils.getDateHour(endTime)   //格式：20190103_14
      val proCode = tup._2._5   //取出省份编号
      (dateTime+"_"+proCode,1L)
    }).reduceByKey(_+_).map(tup => {(tup._1.split("_")(2),tup)}).leftOuterJoin(proInfo).map(tup => {
      val row = tup._2
      val provinceCode = tup._1   //获取省份编号
      val province: String = row._2.getOrElse(null)  //获取省份名
      val fields = row._1._1.split("_")   //日期_小时_省份编号
      val date = fields(0)  //日期
      val hour = fields(1)  //小时

      //先从数据库中查询出已经存在的充值失败数
      val list: List[String] = DB.readOnly { implicit session =>
        SQL("select count from province_hour_fail_count where provinceCode=? AND date=? AND hour=?")
          .bind(provinceCode,date,hour).map(rs => rs.string("count")).list().apply()}
      var count:Long = 0L
      for(str <- list){count += str.toLong}
      count += row._1._2 //充值失败数

      //将记录插入数据库
      DB.autoCommit {
        implicit session =>
          SQL("replace into province_hour_fail_count(provinceCode, province,date,hour,count) values(?,?,?,?,?)")
            .bind(provinceCode,province,date,hour,count).update().apply()
        }
    }).count()  //count的作用是触发 Action操作


  }

  def rechargeNotice(sc:SparkContext, kafkaRDD: RDD[(String, String)])={
    val filtered = kafkaRDD.map(tup => {
      //取出每条记录所需要的字段
      val json: JSONObject = JSON.parseObject(tup._2) //一条JSON记录
      val name = json.getString("serviceName") //事件名
      val rst = json.getString("bussinessRst") //充值结果
      val chargefee = json.getString("chargefee").toLong //充值金额
      val startTime = json.getString("requestId").substring(0, 17) //开始时间
      val endTime = json.getString("receiveNotifyTime") //结束时间
      val proCode = json.getString("provinceCode") //省份编号
      (name, (rst, chargefee, startTime, endTime, proCode))
    }).filter(tup => tup._1 == "reChargeNotifyReq")

    filtered.foreachPartition(it => {
      //充值订单数
      var chargeCount:Long = if(RedisUtils.getData("ChargeCount")!=null) RedisUtils.getData("ChargeCount").toLong else 0L
      //充值金额
      var chargefeeSum = if(RedisUtils.getData("ChargefeeSum")!=null)RedisUtils.getData("ChargefeeSum").toLong else 0L
      //成功订单数
      var rstSuccess = if(RedisUtils.getData("RstSuccess")!=null) RedisUtils.getData("RstSuccess").toLong else 0L
      while(it.hasNext) {
        val tup = it.next()
        chargeCount += 1L
        RedisUtils.setData("ChargeCount", chargeCount + "")
        if (tup._2._1 == "0000") { //当返回值为 “0000”表示成功
          rstSuccess += 1L //成功支付订单数加一
          RedisUtils.setData("RstSuccess", rstSuccess + "") //将结果更新到Redis
          chargefeeSum += tup._2._2 //将充值金额进行累加
          RedisUtils.setData("ChargefeeSum", chargefeeSum + "")
        }
      }
    })
    filtered
  }

}
